DataX 是阿里云开源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、MaxCompute(原ODPS)、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
Datax 的数据转换支持 UserDefined Function
官方的使用说明如下:
dx_groovy
- 参数。
- 第一个参数: groovy code
- 第二个参数(列表或者为空):extraPackage
- 备注:
- dx_groovy只能调用一次。不能多次调用。
- groovy code中支持java.lang, java.util的包,可直接引用的对象有record,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。
- groovy code中,返回更新过的Record(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。
- 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil),目前GroovyTransformerStaticUtil的方法列表 (按需补充):
- 举例:1234567groovy 实现的subStr:String = "Column column = record.getColumn(1);\n" +" String oriValue = column.asString();\n" +" String newValue = oriValue.substring(0, 3);\n" +" record.setColumn(1, new StringColumn(newValue));\n" +" return record;";dx_groovy(record);
|
|
|
|
- 以下是我自己实现的字符串替换 dx_groovy 函数
{
“job”: {
}"content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": ["user_role_id","username","ROLE"], "connection": [ { "jdbcUrl": ["jdbc:mysql://10.1.18.155:3306/saiku_test"], "table": ["user_roles_copy"] } ], "password": "123456", "username": "root", "where": "" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": ["user_role_id","username","ROLE"], "connection": [ { "jdbcUrl": "jdbc:mysql://10.1.18.155:3306/saiku_test", "table": ["user_roles_trans"] } ], "password": "123456", "preSql": [], "session": [], "username": "root", "writeMode": "insert" } }, "transformer": [ { "name": "dx_filter", "parameter": { "columnIndex":1, "paras":["=","null"] } },{ "name": "dx_groovy", "parameter": { "code": "Column column = record.getColumn(2);\nString oriValue = column.asString();\nString sourceString = \"ROLE_ADMIN\";\nString changeString = \"replaceTest\";\nif (oriValue.equals(sourceString)){record.setColumn(2, new StringColumn(changeString));\nreturn record;}\nreturn record;", "extraPackage":[] } } ] } ], "setting": { "speed": { "channel": "1" } }
}